希望通过本文的解析,让读者了解Vertx的关键部分的实现原理。对诸如如下问题有一个具象的认识。
- Vertx实例的作用?一个应用是否只对应一个Vertx实例?
- Verticle是一个怎样的存在?
- 本地模式下消息是如何在EventBus上传输和响应的?
- EventBus和EventLoop是如何关联起来的?
概述
Vert.x是一个事件驱动,基于Netty库构建的高性能应用程序框架。实现了所谓的Multi-Reactor模型,能够充分利用多核CPU实现以事件循环为基础的基本编程模型。同时在此基础上构建了Verticle这样类似Actor的概念,以应对并发编程的需求。
Vert.x的核心为EventBus和EventLoop,前者用户消息传输,作为联通各个Handler的神经系统;后者作为任务执行的调度者,保证高性能。任何使用Vert.x构建的应用,都必须围绕这二者作文章。否则就失去了使用它的意义。
核心类
Vertx
Vertx是最为核心的类,创建任何Vertx组件几乎都需要Vertx类的实例。
创建一个单机实例的方法是Vertx.vertx()
,然后就可以使用了。以此为入口,我们看看Vertx在创建时都做了什么。
看继承关系
Vertx是一个接口,VertxImpl是最终实现类,也是唯一的实现类。其中包含了单机和集群两种模式的实现。
1 2 3 4 5 6 7 8 9 10 11
| static VertxImpl vertx(VertxOptions options, Transport transport) { VertxImpl vertx = new VertxImpl(options, transport); vertx.init(); return vertx; }
static void clusteredVertx(VertxOptions options, Transport transport, Handler<AsyncResult<Vertx>> resultHandler) { VertxImpl vertx = new VertxImpl(options, transport); vertx.joinCluster(options, resultHandler); }
|
看Vertx接口的功能
从Vertx接口,看Vertx能干啥。图太长,不方便放,这里只列举核心部分,也是我们用得最多的。
- 创建单机/集群版的Vertx实例
- 创建或获取上下文Context
- 指定特定的Handler运行在当前上下文中
- 获取EventBus
- 获取共享数据
- 设定定时任务
- 发布Verticle
- 执行阻塞方法
如上,Vertx类几乎撑起了所有部分。接着我们看它是如何做到的。
看VertxImpl构造方法
VertxImpl在构造时创建了很多私有对象,具体如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| private VertxImpl(VertxOptions options, Transport transport) { closeHooks = new CloseHooks(log); checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit()); maxEventLoopExTime = options.getMaxEventLoopExecuteTime(); maxEventLoopExecTimeUnit = options.getMaxEventLoopExecuteTimeUnit(); eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, maxEventLoopExTime, maxEventLoopExecTimeUnit); eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100); ExecutorService workerExec = new ThreadPoolExecutor(workerPoolSize, workerPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedTransferQueue<>(), new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; workerPool = new WorkerPool(workerExec, workerPoolMetrics); ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics); this.fileResolver = new FileResolver(options.getFileSystemOptions()); this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions()); this.deploymentManager = new DeploymentManager(this); if (options.getEventBusOptions().isClustered()) { this.clusterManager = getClusterManager(options); this.eventBus = new ClusteredEventBus(this, options, clusterManager); } else { this.clusterManager = null; this.eventBus = new EventBusImpl(this); } this.sharedData = new SharedDataImpl(this, clusterManager); }
|
上面太复杂,整理成思维导图会好看很多。
EventBus用于进行消息传输;
EventLoopGroup为事件循环组,是Netty库中的类,每当有新的任务都会被提交到该组中执行;
而另一个EventLoopGroup——acceptorEventLoopGroup专用于网络服务的创建,目的是避免上面的eventLoopGroup的阻塞造成服务响应不及时;
WorkerPool为单独开的线程池,负责执行阻塞操作;
FileSystem用于操作文件;
AddressResolver用于进行DNS地址解析;
SharedData用于在整个Vertx应用内部共享数据,包括集群模式;
ClusterManager用于进行集群管理;
DeploymentManager和VerticleManager用于发布Verticle,保证Verticle的特性。
所有上述类你可能都不是很熟悉,没关系,先有个印象,下面分析具体场景时会用到。
EventBus
EventBus的继承关系也很简单,其单机版实现类为EventBusImpl,ClusteredEventBus继承自它,除了服务监听和远程调用,均使用了EventBusImpl中的方法。
EventBus的能力,以及EventBusImpl持有对象如下:
出入拦截器自不必说,每次消息进来和出去都会先被拦截器处理;
vertx对象,主要用于获取发送调用代码所处的上线文环境;
handerMap是核心,以地址为key,地址上注册的Handler序列为value,存储了地址-处理器的映射管理;当触发发送动作时,就会到该映射中查找对应的处理器然后执行;对于单机应用,handlerMap就是所有;对于集群应用,则是先找到节点,再在节点中的handlerMap查找对应处理器。
sendNoContext是为了在执行发送的代码块不处于任何上下文时使用的上下文。EventBusImpl创建时使用。
EventBusImpl的构造方法没什么内容,就不提了。
1 2 3 4 5 6
| public EventBusImpl(VertxInternal vertx) { VertxMetrics metrics = vertx.metricsSPI(); this.vertx = vertx; this.metrics = metrics != null ? metrics.createEventBusMetrics() : null; this.sendNoContext = vertx.getOrCreateContext(); }
|
EventLoop
Vertx中并没有EventLoop这个类,它是Netty中的类。对Vertx的源码,与EventLoop相关的交互只有两处:创建EventLoopGroup;向EventLoopGroup提交任务。
具体内容请查找Netty相关资料进行学习。
Context
Context是真正提交任务的地方,凡Vertx中涉及到任务的执行,总是少不了Context的身影。
其核心能力主要在协调代码的运行,同时也可存储数据。其大部分逻辑都在ContextImpl中。其两个子类,仅在自我裁定、任务提交、上下文复制上有所不同。
Verticle
Verticle放在这里有一点另类,因为它并非核心组件。只是Vertx提供的actor模式实现的一个发布单元。它的actor特性由VerticleManager、EventBus、Context等一起保证。就其能力来说,也只有启动和停止两个方法。
从EventBus看Vertx工作原理
一个简单的Vertx应用如下,我们从它开始分析。
1 2 3 4 5 6 7
| fun main() { val vertx = Vertx.vertx(); vertx.eventBus().consumer<String>("helloAddress").handler{ print(it.body()) } vertx.eventBus().send("helloAddress", "hello world!") }
|
Vertx.vertx()在上面已经看过了,它创建了一个VertxImpl对象,持有一堆用于组织工作的属性,包括EventBus。
1 2
| this.eventBus = new EventBusImpl(this);
|
consumer做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Override public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) { Objects.requireNonNull(handler, "handler"); MessageConsumer<T> consumer = consumer(address); consumer.handler(handler); return consumer; }
@Override public <T> MessageConsumer<T> consumer(String address) { checkStarted(); Objects.requireNonNull(address, "address"); return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1); }
public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address, String repliedAddress, boolean localOnly, Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) { this.vertx = vertx; this.metrics = metrics; this.eventBus = eventBus; this.address = address; this.repliedAddress = repliedAddress; this.localOnly = localOnly; this.asyncResultHandler = asyncResultHandler; if (timeout != -1) { timeoutID = vertx.setTimer(timeout, tid -> { if (metrics != null) { metrics.replyFailure(address, ReplyFailure.TIMEOUT); } sendAsyncResultFailure(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress)); }); } }
@Override public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) { if (h != null) { synchronized (this) { handler = h; if (registered == null) { registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly); } } return this; } this.unregister(); return this; }
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(registration.getHandler(), "handler"); LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly); addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult); return result.holder; }
|
要点总结
- consumer方法仅仅将给定的handler注册到EventBusImpl持有的handlerMap中,等待被消费。
send做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) { checkStarted(); HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler); OutboundDeliveryContext<T> sendContext = new OutboundDeliveryContext<>(message, options, replyHandlerRegistration); sendContext.next(); }
private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) { if (replyHandler != null) { long timeout = options.getSendTimeout(); String replyAddress = generateReplyAddress(); message.setReplyAddress(replyAddress); Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler); HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout); registration.handler(simpleReplyHandler); return registration; } else { return null; } } protected String generateReplyAddress() { return "__vertx.reply." + Long.toString(replySequence.incrementAndGet()); }
@Override public void next() { if (iter.hasNext()) { Handler<DeliveryContext> handler = iter.next(); try { if (handler != null) { handler.handle(this); } else { next(); } } catch (Throwable t) { log.error("Failure in interceptor", t); } } else { if (replierMessage == null) { sendOrPub(this); } else { sendReply(this, replierMessage); } } }
protected ReplyException deliverMessageLocally(MessageImpl msg) { msg.setBus(this); ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address()); if (handlers != null) { if (msg.isSend()) { HandlerHolder holder = handlers.next(); if (metrics != null) { metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), holder != null ? 1 : 0); } if (holder != null) { deliverToHandler(msg, holder); Handler<AsyncResult<Void>> handler = msg.writeHandler; if (handler != null) { handler.handle(Future.succeededFuture()); } } } else { if (metrics != null) { metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), handlers.size()); } for (HandlerHolder holder: handlers) { deliverToHandler(msg, holder); } Handler<AsyncResult<Void>> handler = msg.writeHandler; if (handler != null) { handler.handle(Future.succeededFuture()); } } return null; } else { ... ... } }
private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) { MessageImpl copied = msg.copyBeforeReceive(); DeliveryContext<T> receiveContext = new InboundDeliveryContext<>(copied, holder);
if (metrics != null) { metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal()); }
holder.getContext().runOnContext((v) -> { try { receiveContext.next(); } finally { if (holder.isReplyHandler()) { holder.getHandler().unregister(); } } }); }
@Override public void next() { if (iter.hasNext()) { } else { holder.getHandler().handle(message); } }
|
要点总结
- send分为两步
- 查询handler,调用send时马上执行,是同步的。
- 执行handler,通过handler注册时的context执行,是异步的。
- 消息响应的实现方式是注册一个响应handler到EventBus中,名为__vertx.reply.xxx,其中xxx为单调递增数字。
- 如果同一地址注册了多个handler,则点对点传输模式下只会取第一个handler进行处理;发布模式下才会执行所有。
- 在一个上下文中注册的handler,不管被执行时机如何,最终都会在该上下文中执行。参见:
holder.getContext().runOnContext(...
,hodler为HandlerHolder对象,在调用consumer注册时保存了注册上下文。
和EventLoop的关系在哪?
通过consumer和send看到了EventBus是如何协调接收和发送的,但并没有看到EventLoop是如何参与的。其实它是有参与的,在holder.getContext().runOnContext(...
是进行了参与。
于是我们看看EventLoopContext.runOnContext(),如下。就是向Context保存的EventLoop对象提交一个任务即可。调度的事,交给Netty来做
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void runOnContext(Handler<Void> task) { try { executeAsync(task); } catch (RejectedExecutionException ignore) { } }
void executeAsync(Handler<Void> task) { nettyEventLoop().execute(() -> executeTask(null, task)); }
|
Verticle工作机制
Vert.x推荐使用Verticle进行开发,它是一个类Actor的模型,具有如下特点。
- 同一Verticle下的所有操作均在一个EventLoop线程上执行。以此避免了线程安全问题。
- Verticle之间通过EventBus进行消息传递
- Verticle具有父子层级关系
一个典型的代码结构如下(官方starter使用Launcher启动的应用,本质上也是通过这种方式启动的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class Verticle1 : AbstractVerticle() { override fun start() { println("Verticle 1 started") } }
class Verticle2 : AbstractVerticle() { override fun start() { println("Verticle 2 started") } }
fun main() { val vertx = Vertx.vertx(); vertx.deployVerticle(Verticle1::class.java.canonicalName) vertx.deployVerticle(Verticle2::class.java.canonicalName) }
|
我们需要探究的问题是
- deployVerticle时发生了什么?
- start()和stop()方法什么时候被调用?
- 如何保证一个Verticle下的所有操作都在一个EventLoop线程上执行?
- 父子层级关系如何维持?有什么作用?
要搞清楚这些问题,我们先看几个与此相关的类
Deployment
维护一个发布状态,父子状态也是由它维护的。其唯一实现类DeploymentImpl是作为DeploymentManager的私有内部类存在的。这意味着Verticle发布的所有操作都在DeploymentManager内完成。
其中可能需要解释的点是getVerticles(),这意味着一个Deployment可以有多个Verticle吗?一定程度上是,但仅当一个Verticle需要发布多个实例时,才会存在多个Verticle对象。
其中需要重点关注的方法是io.vertx.core.impl.DeploymentManager.DeploymentImpl#doUndeploy
和io.vertx.core.impl.DeploymentManager.DeploymentImpl#doUndeployChildren
,两个方法递归调用,完成了指定Verticle及其子Verticle的取消。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public synchronized Future<Void> doUndeploy(ContextInternal undeployingContext) { if (status == ST_UNDEPLOYED) { return Future.failedFuture(new IllegalStateException("Already undeployed")); } if (!children.isEmpty()) { status = ST_UNDEPLOYING; return doUndeployChildren(undeployingContext).compose(v -> doUndeploy(undeployingContext)); } else { status = ST_UNDEPLOYED; List<Future> undeployFutures = new ArrayList<>(); if (parent != null) { parent.removeChild(this); } for (VerticleHolder verticleHolder: verticles) { ContextImpl context = verticleHolder.context; Promise p = Promise.promise(); undeployFutures.add(p.future()); context.runOnContext(v -> { Promise<Void> stopPromise = Promise.promise(); Future<Void> stopFuture = stopPromise.future(); stopFuture.setHandler(ar -> { deployments.remove(deploymentID); VertxMetrics metrics = vertx.metricsSPI(); if (metrics != null) { metrics.verticleUndeployed(verticleHolder.verticle); } context.runCloseHooks(ar2 -> { if (ar2.failed()) { log.error("Failed to run close hook", ar2.cause()); } if (ar.succeeded()) { p.complete(); } else if (ar.failed()) { p.fail(ar.cause()); } }); }); try { verticleHolder.verticle.stop(stopPromise); } catch (Throwable t) { if (!stopPromise.tryFail(t)) { undeployingContext.reportException(t); } } }); } Promise<Void> resolvingPromise = undeployingContext.promise(); CompositeFuture.all(undeployFutures).<Void>mapEmpty().setHandler(resolvingPromise); return resolvingPromise.future(); } }
private synchronized Future<Void> doUndeployChildren(ContextInternal undeployingContext) { if (!children.isEmpty()) { List<Future> childFuts = new ArrayList<>(); for (Deployment childDeployment: new HashSet<>(children)) { Promise<Void> p = Promise.promise(); childFuts.add(p.future()); childDeployment.doUndeploy(undeployingContext, ar -> { children.remove(childDeployment); p.handle(ar); }); } return CompositeFuture.all(childFuts).mapEmpty(); } else { return Future.succeededFuture(); } }
|
总结如下
- 一个Verticle被取消,则其所有子Verticle都会被取消
- VerticleHolder中存储了Verticle对应的Context,因此能够保证Verticle的所有生命周期方法都在同一个Context中执行。
DeploymentManager
DeploymentManager专门用于Verticle发布。
重点方法在如下几个
DeploymentManager#doDeploy(DeploymentOptions, Function<Verticle,String>, ContextInternal, ContextInternal,ClassLoader, Callable<io.vertx.core.Verticle>)
DeploymentManager#undeployVerticle(String)
发布
发布代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| private Future<Deployment> doDeploy(String identifier, DeploymentOptions options, ContextInternal parentContext, ContextInternal callingContext, ClassLoader tccl, Verticle... verticles) { Promise<Deployment> promise = callingContext.promise(); String poolName = options.getWorkerPoolName();
Deployment parent = parentContext.getDeployment(); String deploymentID = generateDeploymentID(); DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);
AtomicInteger deployCount = new AtomicInteger(); AtomicBoolean failureReported = new AtomicBoolean(); for (Verticle verticle: verticles) { WorkerExecutorInternal workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()) : null; WorkerPool pool = workerExec != null ? workerExec.getPool() : null; ContextImpl context = (ContextImpl) (options.isWorker() ? vertx.createWorkerContext(deployment, pool, tccl) : vertx.createEventLoopContext(deployment, pool, tccl)); if (workerExec != null) { context.addCloseHook(workerExec); } deployment.addVerticle(new VerticleHolder(verticle, context)); context.runOnContext(v -> { try { verticle.init(vertx, context); Promise<Void> startPromise = context.promise(); Future<Void> startFuture = startPromise.future(); verticle.start(startPromise); startFuture.setHandler(ar -> { if (ar.succeeded()) { if (parent != null) { if (parent.addChild(deployment)) { deployment.child = true; } else { deployment.undeploy(event -> promise.fail("Verticle deployment failed.Could not be added as child of parent verticle")); return; } } deployments.put(deploymentID, deployment); if (deployCount.incrementAndGet() == verticles.length) { promise.complete(deployment); } } else if (failureReported.compareAndSet(false, true)) { deployment.rollback(callingContext, promise, context, ar.cause()); } }); } catch (Throwable t) { if (failureReported.compareAndSet(false, true)) deployment.rollback(callingContext, promise, context, t); } }); }
return promise.future(); }
|
总结如下
- 对每个verticle,vertx都会创建一个新的Context,因此每个verticle之间是相互独立的(一个Context代表了一个EventLoop线程。)
- 传入init和start方法的vertx实例,是DeploymentManager中维护的,它是在Vertx.vertx()创建时赋予的,整个应用一个。
- 整个verticle的内容都通过Context.runOnContext注册运行,所以它们才会始终都在一个线程上执行,并且执行顺序从上到下,不存在多线程竞争问题。
- 发布完成的Deployment会被加入DeploymentManager维护的deployments映射中,方便进行查找和之后的使用。
取消发布
1 2 3 4 5 6 7 8 9 10 11 12
| public Future<Void> undeployVerticle(String deploymentID) { Deployment deployment = deployments.get(deploymentID); Context currentContext = vertx.getOrCreateContext(); if (deployment == null) { return ((ContextInternal) currentContext).failedFuture(new IllegalStateException("Unknown deployment")); } else { return deployment.undeploy(); } }
|
Deployment.undeploy()在上面介绍Deployment时已介绍。
VerticleManager
DeploymentManager专注于发布,VerticleManager则主要专注于Verticle的创建。其内部持有一个DeploymentManager对象,用于执行实际的发布操作。
该类中有两个主要逻辑
- VerticleFactory的注册、取消、查找等。可以实现自定义的VerticleFactory,这里不深入。
- Verticle的发布和创建的逻辑:调用VerticleFactory创建Verticle实例,在调用DeploymentManager.deploy()发布,代码过长,不给出。
所以Verticle是如何工作的?
这里回答最初提出的四个问题,就能解释Verticle是如何工作的。
deployVerticle时发生了什么?
创建Verticle对象 -> 创建Context并和Verticle对象绑定 -> 构建Deployment并存起来 -> 执行init() -> 执行start() -> 完成
start()和stop()方法什么时候被调用?
start(): 发布时,在新创建的Context上执行。
stop(): 取消发布时,在与该Verticle绑定的Context上执行。
如何保证一个Verticle下的所有操作都在一个EventLoop线程上执行?
通过将Context和Verticle绑定,调用start()和stop()时均在该Context下执行;而在start()和stop()中调用vertx的大多数操作,均是在调用代码块的当前Context下执行,而一个Context始终对应同一个EventLoop线程,如此即能保证一个Verticle下的所有操作都在同一个EventLoop线程上执行。
父子层级关系如何维持?有什么作用?
通过Deployment对象记录并维持。作用在于关闭一个Verticle时,其子Verticle也会被依次关闭。
如此一来,Verticle几乎有了除容错机制外的所有的Actor模型的特性。
数据共享机制
Vertx提供了SharedData组件,用于为整个应用范围内提供共享组件,一个共享Map的使用大概如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| class Verticle1 : AbstractVerticle() { override fun start() { println("Verticle 1 started") vertx.sharedData().getLocalAsyncMap<String, String>("myMap").setHandler { ar -> ar.result().put("你好", "我是Verticle1") } } }
class Verticle2 : AbstractVerticle() { override fun start() { println("Verticle 2 started") vertx.sharedData().getLocalAsyncMap<String, String>("myMap").setHandler { ar -> val value = ar.result().get("你好").result() println(value) } } }
fun main() { val vertx = Vertx.vertx(); vertx.deployVerticle(Verticle1::class.java.canonicalName) Thread.sleep(1000) vertx.deployVerticle(Verticle2::class.java.canonicalName) }
|
所有关于共享数据的内容都在io.vertx.core.shareddata包下,核心类是SharedDataImpl。
提供如下三种数据结构
io.vertx.core.shareddata.impl.LocalAsyncLocks
异步排他锁,在集群内部有效的锁。其实现的思路如下
- 维护一个ConcurrentMap,存储锁名和等待该锁的Handler列表
- 每次新来一个获取锁的请求,向等待列表中加入。并启动定时器开始计算超时,超时后直接回调锁等待超时。
至此加入等待列表的逻辑完成。然后是锁流转逻辑。采用被动的逻辑,非常节省复杂度。
- 当等待列表为空时,来一个请求就将锁给它;列表不为空时,仅加入等待列表,不做尝试获取锁的操作。
- 当一个锁被释放时,再主动将锁给等待列表的下一个请求。这样几乎从来不会出现竞争的情况。
io.vertx.core.shareddata.impl.AsynchronousCounter
计数器,增减都是原子操作
io.vertx.core.shareddata.impl.LocalMapImpl
本地Map,用于单个实例中共享数据。仅是对ConcurrentMap的包装,没有其它特别之处。他的所有操作都是同步的。
io.vertx.core.shareddata.impl.LocalAsyncMapImpl
异步Map,同样是对ConcurrentMap的包装。不同之处在于其value是Holder类,它封装了TTL,实现原理是调用vertx.setTimer设置一个TTL长度的定时器,过期移除。
1 2 3 4 5 6 7 8 9 10
| @Override public void put(K k, V v, long timeout, Handler<AsyncResult<Void>> completionHandler) { long timestamp = System.nanoTime(); long timerId = vertx.setTimer(timeout, l -> removeIfExpired(k)); Holder<V> previous = map.put(k, new Holder<>(v, timerId, timeout, timestamp)); if (previous != null && previous.expires()) { vertx.cancelTimer(previous.timerId); } completionHandler.handle(Future.succeededFuture()); }
|
可能有顾虑设置太多定时器不好,但vertx其实是将定时任务加入eventLoop线程去执行,因此并不会增加额外成本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public long setTimer(long delay, Handler<Long> handler) { return scheduleTimeout(getOrCreateContext(), handler, delay, false); } private long scheduleTimeout(ContextImpl context, Handler<Long> handler, long delay, boolean periodic) { if (delay < 1) { throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms"); } long timerId = timeoutCounter.getAndIncrement(); InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context); timeouts.put(timerId, task); context.addCloseHook(task); return timerId; } InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextImpl context) { this.context = context; this.timerID = timerID; this.handler = runnable; this.periodic = periodic; EventLoop el = context.nettyEventLoop(); if (periodic) { future = el.scheduleAtFixedRate(this, delay, delay, TimeUnit.MILLISECONDS); } else { future = el.schedule(this, delay, TimeUnit.MILLISECONDS); } if (metrics != null) { metrics.timerCreated(timerID); } }
|
框图
有待为每个工作原理都加上框图
总结
Vertx核心为EventBus、EventLoop,以及Verticle。这里通过先展示核心类的能力和实现原理,让读者有一个具象的认识,了解每个核心类大概有能干什么。然后通过EventBus的简单收发分析,展示了EventBus的工作原理及EventLoop参与代码执行的方式;通过Verticle的发布,展示了Verticle是如何运转的,以及Verticle的线程安全特性得到保障的原因;最后展示了SharedData进行应用范围内数据共享的实现原理。让读者对Vert.x核心部分有了较为深入的认识。
当然,Vert.x的能力远不止于此,这里仅介绍了单机版运行原理,它还支持集群和高可用特性,都是本文没有覆盖到的;此外,核心部分的文件系统、网络编程相关内容也均未介绍,这些留待之后再说。
最后,总结一波一些核心组件相互之间的关系。
- 一般来说,一个应用只有一个Vertx,在整个应用中传来传去的vertx实例,都是一个,除非我们想要拥有完全隔离的EventBus。
- 一个Vertx实例只持有一个EventBus和一个用于日常调度的EventLoopGroup(用于网络服务监听的不算)。
- 一个Vertx实例持有多个线程池,我们最常解除的只有EventLoopGroup和WorkerPool。
- 一个Context只持有一个EventLoop,即只对应一个线程。通过runOnContext()将任务调度到该EventLoop上执行。
- 一个VerticleManager持有多个VerticleFactory。
- 一个DeployManager持有多个Deployment,Deployment之间的父子关系由Deployment自己维护。
- 一个Deployment可以持有多个Verticle实例,但仅能持有一个Verticle类型